package com.shujia.flink.core;

import com.shujia.flink.core.function.Demo1WordCountFlatMap;
import com.shujia.flink.core.function.Demo1WordCountKeySelect;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Demo1WordCountJava {
    public static void main(String[] args) throws Exception {
        //1、创建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //2、读取数据
        DataStream<String> linesDS = env.socketTextStream("master", 8888);
        /*
         * 统计单词的数量
         */
        //一行转换成多行
        DataStream<Tuple2<String, Integer>> wordsDS = linesDS.flatMap(new Demo1WordCountFlatMap());

        //按照单词分组
        KeyedStream<Tuple2<String, Integer>, String> keyByDS = wordsDS.keyBy(new Demo1WordCountKeySelect());

        //统计单词的数量
        DataStream<Tuple2<String, Integer>> countDS = keyByDS.sum(1);

        countDS.print();

        env.execute();
    }
}
